Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35014] SqlNode to operation conversion for models #25834

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

lihaosky
Copy link
Contributor

What is the purpose of the change

Add SqlNode to SqlOperation conversion for models

Brief change log

  • Support Model CRUD node to operation conversion

Verifying this change

This change added tests and can be verified as follows:

  • Added unit test and integration test in TableEnvironmentTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (JavaDocs)

@lihaosky
Copy link
Contributor Author

cc @twalthr to review. Thanks!

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 20, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@lihaosky
Copy link
Contributor Author

2024-12-20T21:31:47.4797792Z Dec 20 21:31:47 21:31:47.478 [INFO] Running org.apache.flink.table.planner.plan.rules.physical.stream.WatermarkAssignerChangelogNormalizeTransposeRuleTest
2024-12-20T21:31:47.7285003Z Dec 20 21:31:47 21:31:47.726 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 2.865 s <<< FAILURE! -- in org.apache.flink.architecture.rules.BanJunit4Rules
2024-12-20T21:31:47.7287086Z Dec 20 21:31:47 21:31:47.726 [ERROR] BanJunit4Rules.NO_NEW_ADDED_JUNIT4_TEST_RULE -- Time elapsed: 2.860 s <<< FAILURE!
2024-12-20T21:31:47.7288102Z Dec 20 21:31:47 java.lang.AssertionError: 
2024-12-20T21:31:47.7289605Z Dec 20 21:31:47 Architecture Violation [Priority: MEDIUM] - Rule 'Junit4 is forbidden, please use Junit5 instead' was violated (2 times):
2024-12-20T21:31:47.7291813Z Dec 20 21:31:47 Method <org.apache.flink.table.planner.operations.SqlDdlToOperationConverterTest.testCreateModel()> calls method <org.junit.Assert.assertNotNull(java.lang.Object)> in (SqlDdlToOperationConverterTest.java:296)
2024-12-20T21:31:47.7294137Z Dec 20 21:31:47 Method <org.apache.flink.table.planner.operations.SqlDdlToOperationConverterTest.testCreateModel()> calls method <org.junit.Assert.assertNotNull(java.lang.Object)> in (SqlDdlToOperationConverterTest.java:303)
2024-12-20T21:31:47.7295939Z Dec 20 21:31:47 	at com.tngtech.archunit.lang.ArchRule$Assertions.assertNoViolation(ArchRule.java:94)
2024-12-20T21:31:47.7297056Z Dec 20 21:31:47 	at com.tngtech.archunit.lang.ArchRule$Assertions.check(ArchRule.java:86)
2024-12-20T21:31:47.7298210Z Dec 20 21:31:47 	at com.tngtech.archunit.library.freeze.FreezingArchRule.check(FreezingArchRule.java:97)
2024-12-20T21:31:47.7301989Z Dec 20 21:31:47 	at com.tngtech.archunit.junit.internal.ArchUnitTestDescriptor$ArchUnitRuleDescriptor.execute(ArchUnitTestDescriptor.java:166)
2024-12-20T21:31:47.7303496Z Dec 20 21:31:47 	at com.tngtech.archunit.junit.internal.ArchUnitTestDescriptor$ArchUnitRuleDescriptor.execute(ArchUnitTestDescriptor.java:149)
2024-12-20T21:31:47.7317722Z Dec 20 21:31:47 	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
2024-12-20T21:31:47.7319141Z Dec 20 21:31:47 	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
2024-12-20T21:31:47.7320302Z Dec 20 21:31:47 	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)

@davidradl
Copy link
Contributor

Reviewed by Chi on 09/01/2025 Go back to the submitter with review comments.

@lihaosky
Copy link
Contributor Author

@snuyanzin , thanks for the review. Addressed your comments

@lihaosky
Copy link
Contributor Author

@flinkbot run azure

@lihaosky
Copy link
Contributor Author

@flinkbot run azure re-run the last Azure build

@snuyanzin
Copy link
Contributor

@flinkbot run azure

@lihaosky
Copy link
Contributor Author

Looks the CI run is always posted in #25834 (comment)

Comment on lines 1530 to 1532
ResolvedCatalogModel resolvedModel =
newModel == null ? null : resolveCatalogModel(newModel);
catalog.alterModel(path, resolvedModel, modelChanges, ignoreIfNotExists);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is a reason to pass non-existing model as a null further rather than applying similar behavior as for alterTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In altertable call, if the table doesn't exist, the operation is a NopOperation: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java#L409. This leads to problem where we don't know the original sql operation and properly give error message etc. So in alterModel it's still altermodelOperation and null indicates original model doesn't exist. It can be handled by downstream

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot simply pass nulls if this is not documented in the Catalog JavaDoc. Why should we allow alter model if it doesn't exist? The catalog manager or higher layers can deal with the problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made it unresolved again since the problem is not solved yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Handle non exist model in AlterModelChangeOperation#execute() method

Comment on lines 1530 to 1532
ResolvedCatalogModel resolvedModel =
newModel == null ? null : resolveCatalogModel(newModel);
catalog.alterModel(path, resolvedModel, modelChanges, ignoreIfNotExists);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot simply pass nulls if this is not documented in the Catalog JavaDoc. Why should we allow alter model if it doesn't exist? The catalog manager or higher layers can deal with the problem.

Map<String, String> changeModelOptions =
OperationConverterUtils.extractProperties(sqlAlterModelSet.getOptionList());
if (changeModelOptions.isEmpty()) {
throw new ValidationException("ALTER MODEL SET does not support empty option");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above:
For tables we also check for immutable connector option. Do we want to do the same for models?

Copy link
Contributor

@snuyanzin snuyanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed at least one issue which should be mitigated before merge
https://github.com/apache/flink/pull/25834/files#r1973674359

@lihaosky lihaosky requested a review from snuyanzin April 1, 2025 04:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants